package flinkstudy.stream.source;

import flinkstudy.Constant;
import flinkstudy.bo.EstateInfo;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 创建 数据源
 *
 * @author daocr
 * @date 2019/12/14
 */

public class CreateSourceTest {

    StreamExecutionEnvironment env = null;

    @Before
    public void init() {
        env = StreamExecutionEnvironment.getExecutionEnvironment();
    }

    /**
     * 读取 csv 文件
     */
    @Test
    public void createInputCsv() throws Exception {

        env.setParallelism(1);

        // 抽取 UserBehavior 的 TypeInformation，是一个 PojoTypeInfo
        PojoTypeInfo<EstateInfo> pojoType = (PojoTypeInfo<EstateInfo>) TypeExtractor.createTypeInfo(EstateInfo.class);
        // 由于 Java 反射抽取出的字段顺序是不确定的，需要显式指定下文件中字段的顺序
        String[] fieldOrder = new String[]{"price", "cityId", "districtId", "plateId", "houseId", "dateTime"};

        // 创建 PojoCsvInputFormat
        PojoCsvInputFormat<EstateInfo> csvInput = new PojoCsvInputFormat<>(Path.fromLocalFile(new File(Constant.FilePath.ESTATE_CSV_PATH)), pojoType, fieldOrder);

        DataStreamSource<EstateInfo> input = env.createInput(csvInput, pojoType);
        input.print();
        env.execute("createInputCsv");


    }

    /**
     * 加载 内存 对象
     */
    @Test
    public void fromCollection() throws Exception {

        ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
        List<EstateInfo> list = new ArrayList<>();

        for (int i = 0; i < 100; i++) {

            EstateInfo mock = new EstateInfo();
            mock.setCityId(threadLocalRandom.nextInt(10, 1000));
            mock.setDistrictId(threadLocalRandom.nextInt(10, 1000));
            mock.setHouseId(threadLocalRandom.nextInt(10, 1000));
            mock.setPlateId(threadLocalRandom.nextInt(10, 1000));
            mock.setPrice(threadLocalRandom.nextLong(1000000, 30000000));
            list.add(mock);

        }

        DataStreamSource<EstateInfo> estateInfoDataStreamSource = env.fromCollection(list);

        estateInfoDataStreamSource.print();

        env.execute();

    }

    /**
     * 加载 内存 对象
     */
    @Test
    public void fromElements() throws Exception {

        ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();

        List<EstateInfo> list = new ArrayList<>();

        for (int i = 0; i < 100; i++) {

            EstateInfo mock = new EstateInfo();
            mock.setCityId(threadLocalRandom.nextInt(10, 1000));
            mock.setDistrictId(threadLocalRandom.nextInt(10, 1000));
            mock.setHouseId(threadLocalRandom.nextInt(10, 1000));
            mock.setPlateId(threadLocalRandom.nextInt(10, 1000));
            mock.setPrice(threadLocalRandom.nextLong(1000000, 30000000));
            list.add(mock);
        }

        DataStreamSource<EstateInfo> estateInfoDataStreamSource = env.fromElements(list.toArray(new EstateInfo[1]));

        estateInfoDataStreamSource.print();

        env.execute();
    }

    /**
     * 读取文本文件
     */
    @Test
    public void readTextFile() throws Exception {

        DataStreamSource<String> dataStreamSource = env.readTextFile(Constant.FilePath.ESTATE_CSV_PATH, "utf-8");

        SingleOutputStreamOperator<EstateInfo> map = dataStreamSource.map(e -> {

            EstateInfo estateInfo = new EstateInfo();
            String[] split = e.split(",");

            estateInfo.setCityId(Integer.valueOf(split[1]));
            estateInfo.setDistrictId(Integer.valueOf(split[2]));
            estateInfo.setHouseId(Integer.valueOf(split[3]));
            estateInfo.setPlateId(Integer.valueOf(split[4]));
            estateInfo.setPrice(Long.valueOf(split[0]));
            estateInfo.setCreateTimeFormat(split[5]);
            return estateInfo;

        });

        map.print();

        env.execute();

    }

    /**
     * 读取文本文件
     * <p>
     * 监控是否有新内容写入
     *
     * @see FileProcessingMode#PROCESS_CONTINUOUSLY
     * 读取到数据最后一行为止
     * @see FileProcessingMode#PROCESS_ONCE
     */
    @Test
    public void readFile() throws Exception {

        TextInputFormat textInputFormat = new TextInputFormat(new Path(Constant.FilePath.ESTATE_CSV_PATH));
        DataStreamSource<String> dataStreamSource = env.readFile(textInputFormat, Constant.FilePath.ESTATE_CSV_PATH, FileProcessingMode.PROCESS_ONCE, 100);

        dataStreamSource.print();


        env.execute();
    }

    /**
     * 读取 无限 数据
     * 例如 mysql 、es 等
     *
     * @throws Exception
     */
    @Test
    public void addSource() throws Exception {


        DataStreamSource<EstateInfo> estateInfoDataStreamSource = env.addSource(new SourceFunction<EstateInfo>() {

            @Override
            public void run(SourceContext<EstateInfo> ctx) throws Exception {

                List<String> strings = FileUtils.readLines(new File(Constant.FilePath.ESTATE_CSV_PATH));

                strings.stream().forEach(e -> {
                    EstateInfo estateInfo = new EstateInfo();
                    String[] split = e.split(",");

                    estateInfo.setCityId(Integer.valueOf(split[1]));
                    estateInfo.setDistrictId(Integer.valueOf(split[2]));
                    estateInfo.setHouseId(Integer.valueOf(split[3]));
                    estateInfo.setPlateId(Integer.valueOf(split[4]));
                    estateInfo.setPrice(Long.valueOf(split[0]));
                    estateInfo.setCreateTimeFormat(split[5]);

                    ctx.collect(estateInfo);
                });
            }

            @Override
            public void cancel() {

            }
        });

        estateInfoDataStreamSource.print();

        env.execute();
    }


}
